 1.Spark 原理之Master & Worker解析下Spark RPC 框架
   
   RPC(Remote Procedure Call)远程过程调用。两台服务器A、B，A服务器上的应用，想要调用B服务器上应用提供
的函数/方法，由于不在一个内存空间，不能直接调用，需要通过网络来表达调用的语义和传达调用的数据。
       RPC接口。让使用者调用远程请求时，就像调用本地函数
       序列化、反序列化
       网络传输
   如果把分布式系统（Hadoop、Spark等）比作一个人，那么RPC可以认为是人体的血液循环系统。它将系统中各个
不同的组件联系了起来。在Spark中，不同组件之间的通信、jar的上传、Shuffle数据的传输、Block数据的复制与备
份都是基于RPC来实现的，所以说 RPC 是分布式系统的基石毫不为过。
   在 Spark 2.x 之前，RPC框架是借助 Akka 来实现的。Akka 是非常优秀的开源分布式框架。弃用 Akka，主要是解决
用户的Spark Application 中 Akka 版本和 Spark 内置的 Akka版本冲突的问题。
   Spark 2.X 的 RPC 框架是基于优秀的网络通信框架 Netty 开发的。Spark RPC借鉴了Akka的中的设计，它是基于
Actor模型，各个组件可以认为是一个个独立的实体，各个实体之间通过消息来进行通信。具体各个组件之间的关系
图如下：
   1).RpcEnv
   RpcEnv是RPC的环境对象，管理着整个 RpcEndpoint 的生命周期，其主要功能有：根据name或uri注册
endpoints、管理各种消息的处理、停止endpoints。其中RpcEnv只能通过RpcEnvFactory创建得到。 RpcEnv中的
核心方法：
   // RpcEndpoint 向 RpcEnv 注册
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

   // 根据参数信息，从 RpcEnv 中获得一个远程的RpcEndpoint
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
   2).RpcEndpoint
   RpcEndpoint：表示一个消息通信体，可以接收、发送、处理消息。
   RpcEndpoint的生命周期为：constructor -> onStart -> receive* -> onStop，其中：
       onStart在接收任务消息前调用主要用来执行初始化
       receive 和 receiveAndReply 分别用来接收RpcEndpoint send 或 ask 过来的消息
	       receive方法，接收由RpcEndpointRef.send方法发送的消息，该类消息不需要进行响应消息（Reply），
而只是在RpcEndpoint端进行处理
           receiveAndReply方法，接收由RpcEndpointRef.ask发送的消息，RpcEndpoint端处理完消息后，需要给
调用RpcEndpointRef.ask的通信端响应消息
       send发送的消息不需要立即处理，ask发送的消息需要立即处理
private[spark] trait RpcEndpoint {
 /**
  * 当前RpcEndpoint所注册的[[RpcEnv]]
  */
 val rpcEnv: RpcEnv
 /**
  * 当前[[RpcEndpoint]]的代理，当`onStart`方法被调用时`self`生效，当`onStop`被调用时，`self`变成
null
  * 注意：在`onStart`方法被调用之前，[[RpcEndpoint]]对象还未进行注册，所以就没有有效的
[[RpcEndpointRef]]
  */
 final def self: RpcEndpointRef = {
   require(rpcEnv != null, "rpcEnv has not been initialized")
   rpcEnv.endpointRef(this)
 }
 /**
  * 用于处理从`RpcEndpointRef.send` 或 `RpcCallContext.reply`接收到的消息。
  * 如果接收到一个不匹配的消息，将会抛出SparkException异常，并发送给`onError`
  *
  * 通过上面的receive方法，接收由RpcEndpointRef.send方法发送的消息，
  * 该类消息不需要进行响应消息（Reply），而只是在RpcEndpoint端进行处理
  */
 def receive: PartialFunction[Any, Unit] = {
   case _ => throw new SparkException(self + " does not implement 'receive'")
 }
 /**
  * 处理来自`RpcEndpointRef.ask`的消息，RpcEndpoint端处理完消息后，需要给调用RpcEndpointRef.ask
的通信端返回响应消息
  */
 def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
   case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
 }
 /**
  * 在处理消息期间出现异常的话将被调用
  */
 def onError(cause: Throwable): Unit = {
   // By default, throw e and let RpcEnv handle it
   throw cause
 }
 /**
  * 当有远端连接到当前服务器时会被调用
  */
 def onConnected(remoteAddress: RpcAddress): Unit = {
   // By default, do nothing.
 }
 /**
  * 当远端与当前服务器断开时，该方法会被调用
  */
 def onDisconnected(remoteAddress: RpcAddress): Unit = {
   // By default, do nothing.
 }
 /**
  * 当前节点与远端之间的连接发生错误时，该方法将会被调用
  */
 def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
   // By default, do nothing
 }
 /**
  * 在 [[RpcEndpoint]] 开始处理消息之前被调用
  */
 def onStart(): Unit = {
   // By default, do nothing
 }
 /**
  * 当[[RpcEndpoint]]正在停止时，该方法将会被调用
  * `self`将会在该方法中被置位null，因此你不能使用它来发送消息
  */
 def onStop(): Unit = {
   // By default, do nothing
 }
 /**
  * A convenient method to stop [[RpcEndpoint]].
  */
 final def stop(): Unit = {
   val _self = self
   if (_self != null) {
     rpcEnv.stop(_self)
   }
 }
}
   3).RpcEndPointRef
   RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当需要向一个具体的RpcEndpoint发送消息时，需要获取到该
RpcEndpoint的引用，然后通过该引用发送消息。
       send 方法发送消息后不等待响应，亦即 Send-and-forget
       ask 方法发送消息后需要等待通信对端给予响应，通过 Future 来异步获取响应结果
private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with
Logging {
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
    
 /**
  * 返回[RpcEndpointRef]]的引用的远端服务器地址
  */
  def address: RpcAddress
  def name: String
 /**
  * 发送一条单向的异步消息，并且发送消息后不等待响应，亦即Send-and-forget
  */
  def send(message: Any): Unit
  /**
  * 发送消息给相关的[[RpcEndpoint.receiveAndReply]]，并且返回一个 Future，能够在timeout时间内接
收回复
  * 该方法只会发送一次消息，失败后不重试
  * 而ask方法发送消息后需要等待通信对端给予响应，通过Future来异步获取响应结果
  */
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
  /**
  * 发送消息给相关的[[RpcEndpoint.receiveAndReply]]，并且返回一个 Future，能够在
defaultAskTimeout时间内接收回复
  * 该方法只会发送一次消息，失败后不重试
  * 而ask方法发送消息后需要等待通信对端给予响应，通过Future来异步获取响应结果
  */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
 /**
  * 发送消息给相关的[[RpcEndpoint.receiveAndReply)]]，并且返回一个Future，能够在
defaultAskTimeout时间内接收回复，如果超时则抛出异常
  * 注意：该方法会阻塞当前线程
  *
  * @param message the message to send
  * @tparam T type of the reply message
  * @return the reply message from the corresponding [[RpcEndpoint]]
  */
  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
 /**
  * 发送消息给相关的[[RpcEndpoint.receiveAndReply)]]，并且返回一个Future，能够在timeout时间内接
收回复，如果超时则抛出异常
  * 注意：该方法会阻塞当前线程
  *
  * @param 发送的消息内容
  * @param 超时时长
  * @tparam 响应消息的类型
  * @return 从[[RpcEndpoint]]端响应的消息内容
  */
  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
 }
}
   
   4).其他组件
      Dispatcher，消息分发器。针对RpcEndpoint发送/接收到的消息，分发至对应的指令收件箱/发件箱。如果指令
接收方是自己存入收件箱，如果指令接收方为非自身端点，则放入发件箱
      Inbox，指令消息收件箱。一个本地端点对应一个收件箱，Dispatcher在每次向Inbox存入消息时，将对应
EndpointData 加入内部待Receiver Queue中，另外Dispatcher创建时会启动一个单独线程进行轮询Receiver
Queue，进行收件箱消息消费
      OutBox，指令消息发件箱。一个远程端点对应一个发件箱，当消息放入Outbox后，紧接着将消息通过
TransportClient 发送出去
      TransportClient，Netty通信客户端。根据OutBox消息的receiver信息，请求对应远程TransportServer
      TransportServer，Netty通信服务端。一个RPC端点一个TransportServer，接收远程消息后调用Dispatcher分
发消息至对应收发件箱   